Cargar datos en cluster

Normalmente, este paso no lo hacemos en nuestra sesión de análisis: los datos están distribuidos en un cluster originalmente. Para nuestro ejemplo, limpiamos y cargamos los datos en memoria:

library(tidyverse)
── Attaching packages ───────────────────────────────────────────── tidyverse 1.2.1 ──
✔ ggplot2 3.0.0     ✔ purrr   0.2.5
✔ tibble  1.4.2     ✔ dplyr   0.7.6
✔ tidyr   0.8.1     ✔ stringr 1.3.1
✔ readr   1.1.1     ✔ forcats 0.3.0
── Conflicts ──────────────────────────────────────────────── tidyverse_conflicts() ──
✖ dplyr::filter() masks stats::filter()
✖ dplyr::lag()    masks stats::lag()
limpiar <- function(lineas,...){
  df_lista <- str_split(lineas, " ") %>% 
    keep(function(x) x[1] != '#') %>%
    transpose %>%
    map(function(col) as.character(col)) 
  df <- data_frame(articulo = df_lista[[1]], 
                   categorias = df_lista[[2]]) 
  df
}
filtrado <- read_lines_chunked("../../datos/similitud/wiki-100000.txt",
                    skip = 1, callback = ListCallback$new(limpiar))

Consideramos los datos ya tokenizados (los tokens son las categorías):

articulos_df <- filtrado %>% bind_rows
articulos_df

Y registramos en el cluster (en este caso, corremos los scripts localmente):

library(sparklyr)

Attaching package: ‘sparklyr’

The following object is masked from ‘package:purrr’:

    invoke
config <- spark_config()
# configuración para modo local:
config$`sparklyr.shell.driver-memory` <- "2G" 
sc <- spark_connect(master = "local", config = config)
* Using Spark: 2.3.0
# normalmente no copiamos de nuestra sesión de R a un cluster! Para este ejemplo
# con datos chicos es posible:
articulos_wiki_tbl <- copy_to(sc, articulos_df, "articulos_wiki", overwrite = TRUE) 
articulos_wiki_tbl

Agrupamos los tokens en una lista:

art_agr <- articulos_wiki_tbl %>%
        group_by(articulo) %>%
        summarise(lista = collect_list(categorias)) 

Y binarizamos (la representación para usar la implementación de spark es de matriz rala: 1 cuando el token/shingle pertenece al documento, y 0 si no):

art_bin <- art_agr %>% 
        ft_count_vectorizer('lista', 'vector', binary = TRUE) 
# estimator
lsh_wiki_estimator <- ft_minhash_lsh(sc, 'vector', 'hashes', 
                           seed = 1227,
                           num_hash_tables = 5)
lsh_wiki_trans <-  ml_fit(lsh_wiki_estimator, art_bin)
art_bin <- ml_transform(lsh_wiki_trans, art_bin)
art_bin %>% head(5)
vec_1 <- art_bin %>% filter(articulo =='Alabama') %>% pull(vector)
similares <- ml_approx_nearest_neighbors(lsh_wiki_trans, 
              art_bin, vec_1[[1]], num_nearest_neighbors = 10) %>% 
              select(articulo, lista, distCol)
print(similares %>% collect)

Encontramos pares similares con un similarity join, por ejemplo:

art_bin <- art_bin %>% mutate(id = articulo)
pares_candidatos <- ml_approx_similarity_join(lsh_wiki_trans, art_bin, art_bin, 0.7,
  dist_col = "distCol") %>% filter(id_a != id_b)
pares_candidatos  %>% tally()
pares <- pares_candidatos %>% filter(distCol < 0.2)
pares %>% tally
pares <- pares %>% collect()

Por ejemplo

DT::datatable(pares %>% filter(str_detect(id_a, "poker") | str_detect(id_b, "poker")))

Nota: la implementación en spark de LSH utiliza solamente amplificación OR. Es posible usar suficientes hashes para obtener pares, y después filtrar los de la distancia que buscamos (¿Cómo implementar familias AND-OR)?

LS0tCnRpdGxlOiAiTFNIIHBhcmEgY2F0ZWdvcsOtYXMgZGUgV2lraXBlZGlhIGVuIFNwYXJrIgpvdXRwdXQ6IGh0bWxfbm90ZWJvb2sKLS0tCgoKIyMgQ2FyZ2FyIGRhdG9zIGVuIGNsdXN0ZXIKCk5vcm1hbG1lbnRlLCBlc3RlIHBhc28gbm8gbG8gaGFjZW1vcyBlbiBudWVzdHJhIHNlc2nDs24gZGUgYW7DoWxpc2lzOiBsb3MgZGF0b3MKZXN0w6FuIGRpc3RyaWJ1aWRvcyBlbiB1biBjbHVzdGVyIG9yaWdpbmFsbWVudGUuIFBhcmEgbnVlc3RybyBlamVtcGxvLCBsaW1waWFtb3MKeSBjYXJnYW1vcyBsb3MgZGF0b3MgZW4gbWVtb3JpYToKCmBgYHtyfQpsaWJyYXJ5KHRpZHl2ZXJzZSkKbGltcGlhciA8LSBmdW5jdGlvbihsaW5lYXMsLi4uKXsKICBkZl9saXN0YSA8LSBzdHJfc3BsaXQobGluZWFzLCAiICIpICU+JSAKICAgIGtlZXAoZnVuY3Rpb24oeCkgeFsxXSAhPSAnIycpICU+JQogICAgdHJhbnNwb3NlICU+JQogICAgbWFwKGZ1bmN0aW9uKGNvbCkgYXMuY2hhcmFjdGVyKGNvbCkpIAogIGRmIDwtIGRhdGFfZnJhbWUoYXJ0aWN1bG8gPSBkZl9saXN0YVtbMV1dLCAKICAgICAgICAgICAgICAgICAgIGNhdGVnb3JpYXMgPSBkZl9saXN0YVtbMl1dKSAKICBkZgp9CmZpbHRyYWRvIDwtIHJlYWRfbGluZXNfY2h1bmtlZCgiLi4vLi4vZGF0b3Mvc2ltaWxpdHVkL3dpa2ktMTAwMDAwLnR4dCIsCiAgICAgICAgICAgICAgICAgICAgc2tpcCA9IDEsIGNhbGxiYWNrID0gTGlzdENhbGxiYWNrJG5ldyhsaW1waWFyKSkKYGBgCgpDb25zaWRlcmFtb3MgbG9zIGRhdG9zIHlhIHRva2VuaXphZG9zIChsb3MgdG9rZW5zIHNvbiBsYXMgY2F0ZWdvcsOtYXMpOgoKYGBge3J9CmFydGljdWxvc19kZiA8LSBmaWx0cmFkbyAlPiUgYmluZF9yb3dzCmFydGljdWxvc19kZgpgYGAKCgpZIHJlZ2lzdHJhbW9zIGVuIGVsIGNsdXN0ZXIgKGVuIGVzdGUgY2FzbywgY29ycmVtb3MgbG9zIHNjcmlwdHMgbG9jYWxtZW50ZSk6CgpgYGB7cn0KbGlicmFyeShzcGFya2x5cikKY29uZmlnIDwtIHNwYXJrX2NvbmZpZygpCiMgY29uZmlndXJhY2nDs24gcGFyYSBtb2RvIGxvY2FsOgpjb25maWckYHNwYXJrbHlyLnNoZWxsLmRyaXZlci1tZW1vcnlgIDwtICIyRyIgIyBwYXJhIHBvZGVyIGhhY2VyIGNvbGxlY3QgZGUgcGFyZXMgbcOhcyBhZGVsYW50ZQpzYyA8LSBzcGFya19jb25uZWN0KG1hc3RlciA9ICJsb2NhbCIsIGNvbmZpZyA9IGNvbmZpZykKIyBub3JtYWxtZW50ZSBubyBjb3BpYW1vcyBkZSBudWVzdHJhIHNlc2nDs24gZGUgUiBhIHVuIGNsdXN0ZXIhIFBhcmEgZXN0ZSBlamVtcGxvCiMgY29uIGRhdG9zIGNoaWNvcyBlcyBwb3NpYmxlOgphcnRpY3Vsb3Nfd2lraV90YmwgPC0gY29weV90byhzYywgYXJ0aWN1bG9zX2RmLCAiYXJ0aWN1bG9zX3dpa2kiLCBvdmVyd3JpdGUgPSBUUlVFKSAKYXJ0aWN1bG9zX3dpa2lfdGJsCmBgYAoKQWdydXBhbW9zIGxvcyB0b2tlbnMgZW4gdW5hIGxpc3RhOgoKYGBge3J9CmFydF9hZ3IgPC0gYXJ0aWN1bG9zX3dpa2lfdGJsICU+JQogICAgICAgIGdyb3VwX2J5KGFydGljdWxvKSAlPiUKICAgICAgICBzdW1tYXJpc2UobGlzdGEgPSBjb2xsZWN0X2xpc3QoY2F0ZWdvcmlhcykpIApgYGAKClkgYmluYXJpemFtb3MgKGxhIHJlcHJlc2VudGFjacOzbiBwYXJhIHVzYXIgbGEgaW1wbGVtZW50YWNpw7NuIGRlIHNwYXJrIGVzCmRlIG1hdHJpeiByYWxhOiAxIGN1YW5kbyBlbCB0b2tlbi9zaGluZ2xlIHBlcnRlbmVjZSBhbCBkb2N1bWVudG8sIHkgMCBzaSBubyk6CgpgYGB7cn0KYXJ0X2JpbiA8LSBhcnRfYWdyICU+JSAKICAgICAgICBmdF9jb3VudF92ZWN0b3JpemVyKCdsaXN0YScsICd2ZWN0b3InLCBiaW5hcnkgPSBUUlVFKSAKCmBgYAoKYGBge3J9CiMgZXN0aW1hdG9yCmxzaF93aWtpX2VzdGltYXRvciA8LSBmdF9taW5oYXNoX2xzaChzYywgJ3ZlY3RvcicsICdoYXNoZXMnLCAKICAgICAgICAgICAgICAgICAgICAgICAgICAgc2VlZCA9IDEyMjcsCiAgICAgICAgICAgICAgICAgICAgICAgICAgIG51bV9oYXNoX3RhYmxlcyA9IDUpCmBgYAoKYGBge3J9CmxzaF93aWtpX3RyYW5zIDwtICBtbF9maXQobHNoX3dpa2lfZXN0aW1hdG9yLCBhcnRfYmluKQphcnRfYmluIDwtIG1sX3RyYW5zZm9ybShsc2hfd2lraV90cmFucywgYXJ0X2JpbikKYXJ0X2JpbiAlPiUgaGVhZCg1KQpgYGAKCgpgYGB7cn0KdmVjXzEgPC0gYXJ0X2JpbiAlPiUgZmlsdGVyKGFydGljdWxvID09J0FsYWJhbWEnKSAlPiUgcHVsbCh2ZWN0b3IpCnNpbWlsYXJlcyA8LSBtbF9hcHByb3hfbmVhcmVzdF9uZWlnaGJvcnMobHNoX3dpa2lfdHJhbnMsIAogICAgICAgICAgICAgIGFydF9iaW4sIHZlY18xW1sxXV0sIG51bV9uZWFyZXN0X25laWdoYm9ycyA9IDEwKSAlPiUgCiAgICAgICAgICAgICAgc2VsZWN0KGFydGljdWxvLCBsaXN0YSwgZGlzdENvbCkKcHJpbnQoc2ltaWxhcmVzICU+JSBjb2xsZWN0KQpgYGAKCkVuY29udHJhbW9zIHBhcmVzIHNpbWlsYXJlcyBjb24gdW4gKnNpbWlsYXJpdHkgam9pbiosIHBvciBlamVtcGxvOgoKYGBge3J9CmFydF9iaW4gPC0gYXJ0X2JpbiAlPiUgbXV0YXRlKGlkID0gYXJ0aWN1bG8pCnBhcmVzX2NhbmRpZGF0b3MgPC0gbWxfYXBwcm94X3NpbWlsYXJpdHlfam9pbihsc2hfd2lraV90cmFucywgYXJ0X2JpbiwgYXJ0X2JpbiwgMC43LAogIGRpc3RfY29sID0gImRpc3RDb2wiKSAlPiUgZmlsdGVyKGlkX2EgIT0gaWRfYikKcGFyZXNfY2FuZGlkYXRvcyAgJT4lIHRhbGx5KCkKYGBgCgpgYGB7cn0KcGFyZXMgPC0gcGFyZXNfY2FuZGlkYXRvcyAlPiUgZmlsdGVyKGRpc3RDb2wgPCAwLjIpCnBhcmVzICU+JSB0YWxseQpwYXJlcyA8LSBwYXJlcyAlPiUgY29sbGVjdCgpCmBgYAoKUG9yIGVqZW1wbG8KCmBgYHtyfQpEVDo6ZGF0YXRhYmxlKHBhcmVzICU+JSBmaWx0ZXIoc3RyX2RldGVjdChpZF9hLCAicG9rZXIiKSB8IHN0cl9kZXRlY3QoaWRfYiwgInBva2VyIikpKQpgYGAKCgoKTm90YTogbGEgaW1wbGVtZW50YWNpw7NuIGVuIHNwYXJrIGRlIExTSCB1dGlsaXphIHNvbGFtZW50ZSBhbXBsaWZpY2FjacOzbiBPUi4gCkVzIHBvc2libGUgdXNhciBzdWZpY2llbnRlcyBoYXNoZXMgcGFyYSBvYnRlbmVyIHBhcmVzLCB5IGRlc3B1w6lzIGZpbHRyYXIKbG9zIGRlIGxhIGRpc3RhbmNpYSBxdWUgYnVzY2Ftb3MgKMK/Q8OzbW8gaW1wbGVtZW50YXIgZmFtaWxpYXMgQU5ELU9SKT8KCg==